Custom data source


#1

In a similar vein as the Seahorse operations SDK, is there a facility for implementing custom data sources in Seahorse (e.g. S3 data source)?


#2

I did this with DOperation0To1 with the SDK but with large data, my execution gets stuck.


#3

@rabe-jens-iwes I suspect this is the problem:

DataFrameReportGenerator.scala#L73

Regardless of the report type generated, it does a full table scan to count the rows. Currently trying to figure out a hack to only report the schema.


#4

@rabe-jens-iwes I can confirm that using the following to wrap your generated Operation0To1 DataFrame does not do full table scan:

import ai.deepsense.commons.types.ColumnType
import ai.deepsense.deeplang.doperables.dataframe.DataFrame
import ai.deepsense.deeplang.doperables.dataframe.report.DataFrameReportGenerator
import ai.deepsense.deeplang.doperables.dataframe.report.DataFrameReportGenerator.ReportContentName
import ai.deepsense.deeplang.doperables.report.Report
import ai.deepsense.reportlib.model.{ReportContent, ReportType, Table}
import org.apache.spark.sql
import org.apache.spark.sql.types.StructType

class SimpleReportDataFrame(override val sparkDataFrame: sql.DataFrame, override val schema: Option[StructType]) extends DataFrame() {
  override def report: Report = {
    val tables = schema.map(schemaTable).toSeq
    Report(ReportContent(
      ReportContentName,
      ReportType.DataFrameSimplified,
      tables
    ))
  }

  def schemaTable(schema: StructType): Table = {
    val values = schema.fields.zipWithIndex.map {
      case (field, index) =>
        val columnName = field.name
        val columnType = field.dataType.simpleString
        List(Some(index.toString), Some(columnName), Some(columnType))
    }.toList

    Table(
      DataFrameReportGenerator.DataSchemaTableName,
      s"Preview of columns and their types in dataset",
      Some(List("Column index", "Column name", "Column type")),
      List(ColumnType.numeric, ColumnType.string, ColumnType.string),
      None,
      values)
  }
}

#5

Looks promising. I’ll give it a shot.

However, I don’t remember seeing the count in my Spark UI.


#6

OK, I gave it a shot.

I have this workflow (I blurred things that are internal to my company):

It now runs up to the “Sort” block, where it gets stuck again.

I looked up the UUIDS in my Spark job overview and they are the IDs of the “Filter Rows”, “Filter COlumns” and “Sort” nodes. So after the “Sort” completes, no new jobs are scheduled and it stalls.

I think internally Seahorse is waiting for something, and during the waiting, some timeout is hit which stops any further execution.


#7

@rabe-jens-iwes That’s good to know. I would have expected the count to show up in the Spark UI in the original data source, from the report generated when DataFrame.fromSparkDataFrame is called. I did some more digging, and “Filter Rows” and “Filter Columns” are TransformerAsOperations,
which I’m guessing delegates report generation to the Transformer (but not 100% sure). That report does not seem to invoke any actions. So if you changed the report type in your data source, the filters would not have invoked an action either.

FWIW, I logged a feature request that, where applicable, action invoking report generation be able to be optionally disabled:


#8

Cool. I also opened an issue about the problem workflows getting stuck as #14: